Author: Aki Ariga
R Pubs doc: https://rpubs.com/chezou/usflights-en
Raw data sources
In order to use the data above, download it and place it into HDFS or S3.
Then create the table definitions.
As of dplyr_0.5.0.9000, the tbl() function does not deal gracefully with Hive tables in a schema other than default. The recommended workaround is to use the sql function as a parameter, something like
airports <- tbl(sc, sql("SELECT * FROM airlines.airports"))
However, the type of dataframe created from the above differs somehow, so that the gcIntermediate function used below fails. So the current assumption is that the airlines and airports tables are in the default database.
USE default;
CREATE EXTERNAL TABLE IF NOT EXISTS airlines
LIKE PARQUET '/user/centos/data/airlines_parquet/4345e5eef217aa1b-c8f16177f35fd983_1150363067_data.0.parq'
STORED AS PARQUET LOCATION '/user/centos/data/airlines_parquet';
CREATE EXTERNAL TABLE IF NOT EXISTS airports_raw
(iata STRING, airport STRING, city STRING, state STRING, country STRING, latitude FLOAT, longitude FLOAT )
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS TEXTFILE
LOCATION 'hdfs:///user/centos/data/airports'
TBLPROPERTIES (skip.header.line.count=1);
create table airports STORED AS PARQUET AS
select regexp_replace(iata,'"','') AS iata,
regexp_replace(airport, '"', '') AS airport,
regexp_replace(city, '"', '') AS city,
regexp_replace(state, '"', '') AS state,
regexp_replace(country, '"', '') AS country,
latitude,
longitude
FROM airports_raw;
--- WHERE iata != '"iata"'
In [ ]:
# Load libraries
library(ggplot2)
library(maps)
library(geosphere)
library(sparklyr)
library(dplyr);
In [ ]:
# Configure cluster
config <- spark_config()
config$spark.driver.cores <- 4
config$spark.executor.cores <- 4
config$spark.executor.memory <- "4G"
config$sparklyr.gateway.port = 8877
#spark_home <- "/opt/cloudera/parcels/CDH/lib/spark"
#spark_version <- "1.6.2"
spark_home <- "/opt/cloudera/parcels/SPARK2/lib/spark2"
spark_version <- "2.0.0"
sc <- spark_connect(master="yarn-client", version=spark_version, config=config, spark_home=spark_home)
In [ ]:
# airlines <- tbl(src=sc, "airlines")
# airlines <- tbl(src=sc, sql("select * FROM airlines.airlines"))
airlines <- tbl(src=sc, "airlines")
head(airlines)
In [ ]:
airline_counts_by_year <- airlines %>% group_by(year) %>% summarise(count=n()) %>% collect
airline_counts_by_year %>% tbl_df %>% print(n=nrow(.))
In [ ]:
g <- ggplot(airline_counts_by_year, aes(x=year, y=count))
g <- g + geom_line(
colour = "magenta",
linetype = 1,
size = 0.8
)
g <- g + xlab("Year")
g <- g + ylab("Flight number")
g <- g + ggtitle("US flights")
plot(g)
In [ ]:
airline_counts_by_month <- airlines %>% filter(year>= 2001 & year<=2003) %>% group_by(year, month) %>% summarise(count=n()) %>% collect
g <- ggplot(
airline_counts_by_month,
aes(x=as.Date(sprintf("%d-%02d-01", airline_counts_by_month$year, airline_counts_by_month$month)), y=count)
)
g <- g + geom_line(
colour = "magenta",
linetype = 1,
size = 0.8
)
g <- g + xlab("Year/Month")
g <- g + ylab("Flight number")
g <- g + ggtitle("US flights")
plot(g)
In [ ]:
flights <- airlines %>% group_by(year, carrier, origin, dest) %>% summarise(count=n()) %>% collect
head(flights)
In [ ]:
# airports <- tbl(sc, sql("select * FROM airlines.airports")) %>% collect
airports <- tbl(sc, "airports") %>% collect
## Now we extract AA’s flight in 2007.
flights_aa <- flights %>% filter(year==2007) %>% filter(carrier=="AA") %>% arrange(count)
head(flights_aa)
Flight visualization code is taken from this article: http://flowingdata.com/2011/05/11/how-to-map-connections-with-great-circles/
In [ ]:
# draw map with line of AA
xlim <- c(-171.738281, -56.601563)
ylim <- c(12.039321, 71.856229)
# Color settings
#require('RColorBrewer')
#colors <- brewer.pal(11,'RdYlBu')
pal <- colorRampPalette(c("#333333", "white", "#1292db"))
colors <- pal(100)
map("world", col="#6B6363", fill=TRUE, bg="#000000", lwd=0.05, xlim=xlim, ylim=ylim)
maxcnt <- max(flights_aa$count)
for (j in 1:length(flights_aa$carrier)) {
air1 <- airports[airports$iata == flights_aa[j,]$origin,]
air2 <- airports[airports$iata == flights_aa[j,]$dest,]
inter <- gcIntermediate(c(air1[1,]$longitude, air1[1,]$latitude), c(air2[1,]$longitude, air2[1,]$latitude), n=100, addStartEnd=TRUE)
colindex <- round( (flights_aa[j,]$count / maxcnt) * length(colors) )
lines(inter, col=colors[colindex], lwd=0.8)
}
In [ ]: